package store

import (
	"context"
	"encoding/json"
	baseLog "gitee.com/zaiqiang231/go-base-app/base_app/log"
	"github.com/go-redis/redis/v8"
	"sync"
	"time"
)

type Loader interface {
	Reload() error
}

type Looper struct {
	ctx    context.Context
	lock   *sync.RWMutex
	loader Loader
}

type Notification struct {
	Command string `json:"command"`
}

func NewLooper(ctx context.Context, loader Loader) *Looper {
	return &Looper{
		ctx:    ctx,
		lock:   new(sync.RWMutex),
		loader: loader,
	}
}

var reloadQueue = make(chan func())
var requeueLock sync.Mutex
var requeue []func()

func (l *Looper) Start() {
	go l.startPubSubLoop()
	go l.reloadQueueLoop()
	go l.reloadLoop()
	l.DoReload() //进行一次加载缓存
}

// 读取消息，放到数组
func (l *Looper) reloadQueueLoop() {
	for {
		select {
		case <-l.ctx.Done():
			return
		case fn := <-reloadQueue:
			requeueLock.Lock()
			requeue = append(requeue, fn)
			requeueLock.Unlock()
		}
	}
}

// redis 监听通知
func (l *Looper) startPubSubLoop() {
	pubsub := GetRedisStore().Subscribe(l.ctx, "lovesport.cluster.notifications")
	defer pubsub.Close()

	if _, err := pubsub.Receive(l.ctx); err != nil {
		baseLog.Errorf("Error while receiving pubsub message: %s", err.Error())
		return
	}
	for {
		select {
		case <-l.ctx.Done():
			return
		case msg := <-pubsub.Channel():
			handleRedisEvent(msg, nil)
		}
	}
}

// 处理通知，消息加入到队列
func handleRedisEvent(v interface{}, reloaded func()) {
	message, ok := v.(*redis.Message)
	if !ok {
		return
	}

	notif := Notification{}
	if err := json.Unmarshal([]byte(message.Payload), &notif); err != nil {
		baseLog.Errorf("Unmarshalling message body failed, malformed: ", err)
		return
	}
	switch notif.Command {
	case "PolicyChanged", "SecretChanged":
		reloadQueue <- reloaded
	default:
		baseLog.Warnf("Unknown notification command: %q", notif.Command)
		return
	}
}

// 间隔事件，判断是否执行刷新
func (l *Looper) reloadLoop(complete ...func()) {
	ticker := time.NewTicker(1 * time.Second)
	for {
		select {
		case <-l.ctx.Done():
			return
		// We don't check for reload right away as the gateway peroms this on the
		// startup sequence. We expect to start checking on the first tick after the
		// gateway is up and running.
		case <-ticker.C:
			cb, ok := shouldReload()
			if !ok {
				continue
			}
			start := time.Now()
			l.DoReload()
			for _, c := range cb {
				// most of the callbacks are nil, we don't want to execute nil functions to
				// avoid panics.
				if c != nil {
					c()
				}
			}
			if len(complete) != 0 {
				complete[0]()
			}
			baseLog.Infof("reload: cycle completed in %v", time.Since(start))
		}
	}
}

// 判断是否要 刷新缓存 返回已有刷新事件
func shouldReload() ([]func(), bool) {
	requeueLock.Lock()
	defer requeueLock.Unlock()
	if len(requeue) == 0 {
		return nil, false
	}
	n := requeue
	requeue = []func(){}
	return n, true
}

func (l *Looper) DoReload() {
	l.lock.Lock()
	defer l.lock.Unlock()

	if err := l.loader.Reload(); err != nil {
		baseLog.Errorf("faild to refresh target storage: %s", err.Error())
	}
}
